In 2021, I decided to make small investments in Bitcoin and Ethereum, the two major cryptocurrencies today. In addition to exploring the major cryptocurrencies as long-term stores of value, I am working to develop systematic approaches to make modest short-term returns for reinvestment.
One strategy to is to:
(1) buy a cryptocurrency on days when the price will go up,
(2) sell a cryptocurrency on days when the price will go down, and
(3) do nothing otherwise
Here, I use machine learning and deep learning algorithms to predict day-to-day Bitcoin price increases and decreases in the closing price.
While none of the machine/deep learning algorithms presented here outperform the buy-and-hold strategy (please see the Conclusion), this framework (with code here) allows one to rapidly evaluate strategy candidates.
(The Coinbase Pro cryptocurrency exchange platform is one place where one can buy and sell a variety of cryptocurriences. cbpro is an unofficial Python client for the Coinbase Pro API that can be used to automate trades based on predictive modeling.)
yfinance is used to download market data from Yahoo! Finance
pytrends is an unofficial API that is used to download data from Google Trends
talib-binary is a Python wrapper for TA-Lib that is used to compute technical indicators used in financial market data analysis
backtrader and pyfolio are used in the analyses and visualizations of trading strategies
# Installing Python packages
import sys
!{sys.executable} -m pip install yfinance
!{sys.executable} -m pip install pytrends
!{sys.executable} -m pip install talib-binary # TA-Lib
!{sys.executable} -m pip install backtrader
!{sys.executable} -m pip install pyfolio
# File management
import os
# Working with DataFrames
import pandas as pd
# For manipulating numbers and datetime objects
import numpy as np
import datetime as dt
from collections import Counter
altround = \
lambda num, prec=0: \
np.floor(num*(10**prec))/(10**prec) \
+ (10**-prec)*((np.mod(num,10**-prec) >= 0.5*(10**-prec)))
# For plotting
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# For downloading, analyzing, and visualizing financial data
import yfinance as yf
from pytrends.request import TrendReq
pytrends = TrendReq()
from talib import RSI, BBANDS, MACD
import backtrader as bt
from backtrader.feeds import PandasData
import pyfolio as pf
# For machine learning modeling
from sklearn import linear_model
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
# For deep learning
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from keras.models import Sequential, load_model, Model
from keras.layers import Dense, LSTM, Dropout, Input, Conv1D
from keras.layers import BatchNormalization, ReLU, GlobalAveragePooling1D
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
# Ignore warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.filterwarnings('ignore')
Open price at 12:01 AM UTC
High highest price of 24-hour period
Low lowest price of 24-hour period
Close price at 11:59PM UTC of a given day
Volume amount traded in 24-hour period
# Ticker symbol for Bitcoin
ticker_symbol = 'btc-usd'
# For saving data in CSV format
data_path = 'working_data'
if not os.path.isdir(data_path):
os.mkdir(data_path)
data_path = os.path.join(data_path, f'{ticker_symbol.upper()}.csv')
# Price history range of dates
start = dt.datetime(2010, 1, 1)
end = dt.datetime(2022, 2, 24) # dt.datetime.now()
# Specify if data should be read from a file if already downloaded
new_download = False
if not new_download and os.path.exists(data_path):
with open(data_path) as f:
ticker_hist = pd.read_csv(data_path, index_col='Date')
ticker_hist.index = pd.to_datetime(ticker_hist.index)
else:
# Download data
ticker_hist = yf.download(ticker_symbol,
progress=False,
actions=True,
start=start,
end=end)
# Save file for later use
ticker_hist.to_csv(data_path)
# Display the price history for the first and last few dates
num_dates_disp = 5
display(ticker_hist.head(num_dates_disp))
display(ticker_hist.tail(num_dates_disp))
A score between 0 and 100 is assigned to time periods based on the degree of Google search interest.
if ticker_symbol in ['btc-usd']:
# Assign an appropriate Google search term to the ticker symbol
if ticker_symbol == 'btc-usd':
term = 'Bitcoin'
data_path = 'working_data'
data_path = os.path.join(data_path, f'{term}_google_trends.csv')
# Specify if data should be read from a file if already downloaded
new_download = False
if not new_download and os.path.exists(data_path):
# Read from file if downloaded already
with open(data_path) as f:
ttrends = pd.read_csv(data_path, index_col='date')
ttrends.index = pd.to_datetime(ttrends.index)
else:
# Create an empty dataframe and update it with downloaded data
ttrends = pd.DataFrame()
# The data needs to be downloaded in approximately 9-month
# batches in order to access daily Google Trends scores;
# otherwise, the scores are weekly
DAYS_IN_YEAR = 365.25
MAX_RANGE_FOR_DAILY_INFO = 0.73 # 0.73 years
# (approximately 9 months)
num_yrs = (end - start).days / DAYS_IN_YEAR
utc = dt.datetime.utcnow()
for i in range(int(np.ceil(num_yrs/MAX_RANGE_FOR_DAILY_INFO))):
start_ = utc - dt.timedelta(
days=DAYS_IN_YEAR*min((i+1)*MAX_RANGE_FOR_DAILY_INFO,
num_yrs))
start_ = start_.date()
end_ = utc - dt.timedelta(
days=DAYS_IN_YEAR*i*MAX_RANGE_FOR_DAILY_INFO)
end_ = end_.date()
kw_list = [term]
pytrends.build_payload(kw_list, timeframe=f'{start_} {end_}')
try:
df = pytrends.interest_over_time()
except Exception as e:
print(e)
print('This is probably because the ' \
'data limit has been reached.')
break
ttrends = pd.concat([df.dropna(), ttrends])
ttrends[term] = ttrends[term].apply(lambda x: x+1)
rows_to_delete = []
date_counts = Counter(ttrends.index)
for k, v in date_counts.items():
if v == 2:
p = np.where(ttrends.index == k)[0]
rows_to_delete.append(p[1])
ttrends.loc[ttrends.index[:p[1]], (term)] =\
ttrends.loc[ttrends.index[:p[1]], (term)]\
* (ttrends[term][p[1]]/ttrends[term][p[0]])
ttrends = ttrends[~ttrends.index.duplicated(keep='first')]
ttrends[term] = ttrends[term] * (100/max(ttrends[term]))
ttrends[term] = ttrends[term].apply(lambda x: int(altround(x)))
# Save file for later use
ttrends.to_csv(data_path)
# Remove partial data
ttrends = ttrends[~ttrends['isPartial']]
# Approximate the scores of the last few dates (present in the price
# data) with the score of the most recent available Google Trends date
while ttrends.tail(1).index[0] + dt.timedelta(1) <= end.date():
ttrends = ttrends.append(
pd.DataFrame(
np.array(ttrends.tail(1)),
columns=ttrends.columns,
index=[ttrends.tail(1).index[0] + dt.timedelta(1)]
)
)
# Display the data for the first and last few dates
display(ttrends.head(num_dates_disp))
display(ttrends.tail(num_dates_disp))
Use the range selector below the main plot to control the range of dates.
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# include candlestick with range selector
fig.add_trace(go.Candlestick(x=ticker_hist.index,
open=ticker_hist['Open'],
high=ticker_hist['High'],
low=ticker_hist['Low'],
close=ticker_hist['Adj Close'],
name='price'),
secondary_y=True)
# include a go.Bar trace for volumes
fig.add_trace(go.Bar(x=ticker_hist.index,
y=ticker_hist['Volume'],
marker_color='purple',
name='volume'),
secondary_y=False)
fig.layout.yaxis2.showgrid=False
# Set y-axes labels
fig.update_yaxes(title_text='price', secondary_y=True)
fig.update_yaxes(title_text='volume', secondary_y=False)
fig.update_layout(
title=f'{ticker_symbol.upper()}'
)
fig.show()
plt.style.use("classic")
ax = ttrends.plot.line(y=term, use_index=True, title='Google Trends')
ax.set_ylabel('score')
plt.show()
The daily log ratio and direction (sign) of change for the opening price, higest price, lowest price, closing price, volume, and Google Trends score are calculated.
Rolling averages (up to a period of 10 days) of the directional change of the closing price are also calculated.
Finally, information from technical indicators such as the:
are calculated.
The RSI has a value between 0 and 100, and measures the magnitude of recent price changes to evaluate overbought or oversold conditions in the price of an asset.
The Bollinger Band is typically 2 standard deviations +/- from a 20-day simple moving average, and captures periods of increased and decreased volatility.
The MACD is calculated by subtracting the 26-period exponential moving average from the 12-period exponential moving average. A 9-day exponential moving average of the MACD called the signal line is subtracted from the MACD to get the histogram. The MACD falling or rising above the signal line can be indicative of bearish (falling price) and bullish (rising price) signals respectively.
5 prior days' worth of each selected feature (predictor) is used in predicting the closing price changes.
# Make a copy of price and volume data
data = ticker_hist.copy()
# Specify if Google Trends data should be combined with
# price and volume data
include_Google_Trends = True
if include_Google_Trends:
data = data.join(ttrends[[term]], how='inner')
data = data.rename(columns = {term: 'Google Trends Score'})
# Drop columns and rename others for the analysis
data.drop(['Close','Dividends','Stock Splits'], inplace=True, axis=1)
data.rename(columns = {'Open':'open',
'High':'high',
'Low':'low',
'Adj Close':'close',
'Volume':'volume'},
inplace=True)
orig_cols = data.columns
for asset_info in orig_cols:
# Calculate daily log ratio for:
# * opening price
# * highest price
# * lowest price
# * closing price
# * volume
# * Google Trends score
col_log_ratio = f'{asset_info[0].lower()}_log_ratio'
if asset_info == 'Google Trends Score':
data[col_log_ratio] = \
data[asset_info].apply(
lambda x: x+1 if x==0 else x
)
else:
data[col_log_ratio] = data[asset_info]
data[col_log_ratio] = \
np.log(
data[col_log_ratio] /
data[col_log_ratio].shift(1)
)
# Calculate direction (sign) of change for:
# * opening price
# * highest price
# * lowest price
# * closing price
# * volume
# * Google Trends score
col_dir = f'{asset_info[0].lower()}_dir'
data[col_dir] = 0
data[col_dir].iloc[1:] = \
np.sign(
data[col_log_ratio].iloc[1:]
).astype(int)
# Investment strategy:
# * Buy if positive day-to-day return is predicted (1)
# * Sell if a loss is predicted (-1)
# * Otherwise maintain current position (0)
asset_info_tgt = 'close'
col_log_ratio = f'{asset_info_tgt[0].lower()}_log_ratio'
col_dir = f'{asset_info_tgt[0].lower()}_dir'
return_threshold = 0 # between 0 and 1 inclusive
data['strategy'] = data[col_dir]
if return_threshold > 0:
data['strategy'][
((data[col_log_ratio]>0)&
(data[col_log_ratio]<=np.log(1+return_threshold))
)
] = 0
# Simplify {1,-1,0}-strategy (action-based representation)
# to {1,-1}-strategy (state-based representation) by mapping
# 0 to 1 or to -1 based on the previous trading position
def simplify_strategy(df, col):
''' simplified strategy format for Machine Learning modeling
'''
pos = -1
for i, p in enumerate(df[col]):
if p == 0:
df[col].iloc[i] = pos
else:
pos = p
simpl_strat = True
if simpl_strat:
data['sstrategy'] = data['strategy']
simplify_strategy(data, 'sstrategy')
# Specify data lags
max_roll_avg = 10
roll_avg_spacing = 1
min_roll_avg = roll_avg_spacing
roll_avgs = list(range(min_roll_avg, max_roll_avg+1, roll_avg_spacing))
# Include rolling averages of closing price trends
for r in roll_avgs:
col = f'{asset_info_tgt[0].lower()}_{r}_day_trend'
data[col] = data[col_dir].rolling(r).mean()
# Include technical indicators
#
# Compute Relative Strength Index (RSI)
rsi_period = 14
data['rsi'] = RSI(
data[asset_info_tgt],
timeperiod=rsi_period
)
# Compute Bollinger Bands
bband_period = 20
high, mid, low = BBANDS(
data[asset_info_tgt],
timeperiod=bband_period
)
data = data.join(pd.DataFrame({'bb_high': high,
'bb_mid': mid,
'bb_low': low},
index=data.index))
data['bb_width'] = data['bb_high'] - data['bb_low']
asset_info = 'bb_width'
col_log_ratio = 'bbw_log_ratio'
data[col_log_ratio] = data[asset_info]
data[col_log_ratio] = \
np.log(
data[col_log_ratio] /
data[col_log_ratio].shift(1)
)
col_dir = 'bbw_dir'
data[col_dir] = 0
data[col_dir].iloc[bband_period:] = \
np.sign(
data[col_log_ratio].iloc[bband_period:]
).astype(int)
# Compute Moving Average Convergence Divergence (MACD)
macd_periods = [12, 26, 9]
data['macd'], data['macd_signal'], data['macd_hist'] = \
MACD(data[asset_info_tgt],
fastperiod=macd_periods[0],
slowperiod=macd_periods[1],
signalperiod=macd_periods[2]
)
tech_ind_info = ['rsi', 'bbw_log_ratio', 'macd_hist']
# Specify data lags
max_lag = 5
lag_spacing = 1
min_lag = lag_spacing
lags = list(range(min_lag, max_lag+1, lag_spacing))
# Lagged log ratios
for lag in lags:
for asset_info in orig_cols:
for chg_info in ['log_ratio', 'dir']:
col = f'{asset_info[0].lower()}_{chg_info}'
data[f'{col}_lag{lag}'] = data[col].shift(lag)
for r in roll_avgs:
col = f'{asset_info_tgt[0].lower()}_{r}_day_trend'
data[f'{col}_lag{lag}'] = data[col].shift(lag)
for col in tech_ind_info:
data[f'{col}_lag{lag}'] = data[f'{col}'].shift(lag)
# Remove NaN's
valid_days = max(1,
max(roll_avgs)-1,
max(rsi_period,
bband_period,
macd_periods[1]+macd_periods[2]-2)) \
+ max(lags)
data = data.iloc[valid_days:]
for lag in lags:
for asset_info in orig_cols:
col = f'{asset_info[0].lower()}_dir'
data[f'{col}_lag{lag}'] =\
data[f'{col}_lag{lag}'].astype(int)
# Display the data for the first and last few dates
display(data.head(num_dates_disp))
display(data.tail(num_dates_disp))
# Directory for saving results
if not os.path.isdir('results'):
os.mkdir('results')
# Visualizing Daily Returns, Google Trends Changes, and Technical Indicators
fig, ax = plt.subplots(4+include_Google_Trends, 1,
sharex=True,
figsize = (12,12))
fig.subplots_adjust(top=0.8)
col_log_ratio = f'{asset_info_tgt[0].lower()}_log_ratio'
i = 0
ax[i].plot(data[col_log_ratio])
ax[i].set(title = 'Daily Returns',
ylabel = 'log(returns)')
ax[i].grid(True)
if include_Google_Trends:
i += 1
asset_info = 'Google Trends Score'
col_log_ratio = f'{asset_info[0].lower()}_log_ratio'
ax[i].plot(data[col_log_ratio])
ax[i].set(title = f'{term} Daily Google Trends Changes',
ylabel = 'log(ratio)')
ax[i].grid(True)
i += 1
ax[i].plot(data['rsi'])
ax[i].set(title = 'Relative Strength Index',
ylabel = 'RSI')
ax[i].grid(True)
i += 1
ax[i].plot(data['bb_mid'])
ax[i].fill_between(data['bb_mid'].index,
data['bb_low'],
data['bb_high'],
color='blue',
alpha=0.1)
ax[i].set(title = 'Bollinger Bands',
ylabel = 'price')
ax[i].grid(True)
i += 1
ax[i].plot(data['macd'], label='MACD')
ax[i].plot(data['macd_signal'], label='Signal')
ax[i].plot(data['macd_hist'], label='Hist')
ax[i].set(title = 'Moving Average Convergence Divergence',
xlabel = 'date')
ax[i].legend(loc='upper left')
ax[i].grid(True)
fig.suptitle(f'{ticker_symbol.upper()} ' + \
f'({asset_info_tgt.capitalize()})',
x=0.1, y=1, fontsize=18);
plt.tight_layout()
plt.savefig(f'results/{ticker_symbol}_day_to_day', dpi=300)
The predictors selected for modeling are:
Training-validation-test splits of 0.6-0.2-0.2 are used for the deep learning models while training-test splits of 0.8-0.2 are used for the other machine learning models. In all cases, the training data points come before the validation and test data points, and the validation data points come before the test data points. A probability decision threshold of 0.51 was used in each model decision for a price increase prediction in the test data.
The Fully Convolutional Neural Network proposed here and defined here is used for the deep neural network (DNN). A long short-term memory (LSTM)-based architecture is used for the recurrent neural network (RNN).
The following models from scikit-learn were also used:
The following feature scaling approach selections are based on model properties (for instance, random forest classifiers are unaffected by feature scaling) as well as preliminary experiments:
The models are fit with the training (and validation) data and evaluated on the test data.
# Selecting predictors
predictors = []
for lag in lags:
for asset_info in orig_cols:
for chg_info in ['log_ratio', 'dir']:
if chg_info == 'dir':
continue
col = f'{asset_info[0].lower()}_{chg_info}'
predictor = f'{col}_lag{lag}'
predictors.append(predictor)
for r in roll_avgs:
col = f'{asset_info_tgt[0].lower()}_{r}_day_trend'
predictor = f'{col}_lag{lag}'
predictors.append(predictor)
for col in tech_ind_info:
predictor = f'{col}_lag{lag}'
predictors.append(predictor)
print(f'predictors: {predictors}')
# For seeding random number generators
seed = 0
# Split the data into training and test sets
test_size = 0.2
train, test = train_test_split(data,
test_size=test_size,
shuffle=False,
random_state=seed
)
print(f'\nNumber of trading days in training dataset: {len(train)}')
print(f'Number of trading days in test dataset: {len(test)}')
# Accounting for simplified strategy format
if simpl_strat:
strat_str = 'sstrategy'
else:
strat_str = 'strategy'
# For mapping strategy labels to and from non-negative integers
num_outputs = len(list(set(data[strat_str])))
def labeltoindex(labels):
labels_ = labels.copy()
unique_labels = sorted(list(set(labels_)))
labeltoindexmap = {}
for i, label in enumerate(unique_labels):
labeltoindexmap[label] = i
for i, label in enumerate(labels_):
labels_[i] = labeltoindexmap[label]
indextolabelmap = {value : key for (key, value) in labeltoindexmap.items()}
return labels_, indextolabelmap
def indextolabel(indices, indextolabelmap):
indices_ = indices.copy()
for i, index in enumerate(indices_):
indices_[i] = indextolabelmap[index]
indices_ = indices_.astype(int)
return indices_
indextolabelmap = labeltoindex(train[strat_str])[1]
# Setting model decision thresholds
if num_outputs == 2:
prob_thr = {1:0.51}
elif num_outputs == 3:
prob_thr = {1:0.6, 2:0.6}
default_class = 0
def prob_to_class(class_prob,
prob_thr=prob_thr,
default_class=default_class):
if prob_thr is None or default_class is None:
return np.argmax(class_prob, axis=1)
else:
pred = np.empty(len(class_prob))
pred[:] = np.nan
for i, c_p in enumerate(class_prob):
for c in sorted(prob_thr.items(),
key=lambda kv: kv[1],
reverse=True):
if c_p[c[0]] >= prob_thr[c[0]]:
pred[i] = c[0]
break
else:
pred[i] = default_class
return pred
# Define functions to create neural networks
#
# Seeding random number generators
np.random.seed(seed)
tf.random.set_seed(seed)
def Fully_Convolutional_Network(input_shape):
input_layer = Input(input_shape)
conv1 = Conv1D(filters=64, kernel_size=3, padding="same")(input_layer)
conv1 = BatchNormalization()(conv1)
conv1 = ReLU()(conv1)
conv2 = Conv1D(filters=64, kernel_size=3, padding="same")(conv1)
conv2 = BatchNormalization()(conv2)
conv2 = ReLU()(conv2)
conv3 = Conv1D(filters=64, kernel_size=3, padding="same")(conv2)
conv3 = BatchNormalization()(conv3)
conv3 = ReLU()(conv3)
gap = GlobalAveragePooling1D()(conv3)
output_layer = Dense(num_outputs, activation="softmax")(gap)
return Model(inputs=input_layer, outputs=output_layer)
def simple_DNN():
model = Fully_Convolutional_Network(
input_shape=(len(lags), data[predictors].shape[1] // len(lags))
)
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=["categorical_accuracy"],
)
return model
def simple_RNN():
model = Sequential()
model.add(LSTM(64, return_sequences=True,
input_dim=len(predictors) // len(lags)))
model.add(Dropout(0.2))
model.add(LSTM(64))
model.add(Dropout(0.2))
model.add(Dense(num_outputs, activation='softmax'))
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['categorical_accuracy'])
return model
# Create a dictionary of selected models
models = {
'log_reg': linear_model.LogisticRegression(random_state=seed),
'gauss_nb': GaussianNB(),
'svm': SVC(random_state=seed, probability=True),
'random_forest': RandomForestClassifier(random_state=seed),
'MLP' : MLPClassifier(random_state=seed),
'dnn': simple_DNN(),
'rnn': simple_RNN(),
}
feature_scaling = {
'log_reg': 'normalization',
'gauss_nb': None,
'svm': 'standardization',
'random_forest': None,
'MLP' : 'standardization',
'dnn': 'normalization',
'rnn': None,
}
# Function that fits all models
def fit_models(data):
# Fit scalers and transform training data
feat_sclr_specs = list(set(feature_scaling.values()))
feat_sclr = {}
data_ = {}
for feat_sclr_spec in feat_sclr_specs:
if feat_sclr_spec == 'normalization':
feat_sclr[feat_sclr_spec] =\
MinMaxScaler().fit(data[predictors])
elif feat_sclr_spec == 'standardization':
feat_sclr[feat_sclr_spec] =\
StandardScaler().fit(data[predictors])
if feat_sclr_spec in ['normalization', 'standardization']:
data_[feat_sclr_spec] = \
feat_sclr[feat_sclr_spec]\
.transform(data[predictors])
else:
data_[feat_sclr_spec] = np.array(data[predictors])
if 'rnn' in models.keys() or 'dnn' in models.keys():
shape = data[predictors].shape
shape = (shape[0], len(lags), shape[1] // len(lags))
global history
history = {}
for model in models.keys():
if model in ['dnn', 'rnn']:
callbacks = [
ModelCheckpoint(
f"best_{model}_model.h5",
save_best_only=True,
monitor="val_loss",
verbose=False
),
ReduceLROnPlateau(
monitor="val_loss",
factor=0.5,
patience=20,
min_lr=0.0001
),
EarlyStopping(
monitor="val_loss",
patience=50,
verbose=False
),
]
history[model] = models[model].fit(
data_[feature_scaling[model]].reshape(shape),
to_categorical(
labeltoindex(data[strat_str])[0],
num_outputs
),
batch_size=32,
epochs=500,
callbacks=callbacks,
validation_split=min(0.5,test_size/(1-test_size)),
verbose=False
)
models[model] = load_model(f"best_{model}_model.h5")
print('')
else:
models[model].fit(
data_[feature_scaling[model]],
labeltoindex(data[strat_str])[0]
)
return feat_sclr
# Function that predicts (derives all
# position values) from the fitted models
def derive_positions(data, feat_sclr):
# Transform testing data
feat_sclr_specs = list(set(feature_scaling.values()))
data_ = {}
for feat_sclr_spec in feat_sclr_specs:
if feat_sclr_spec in ['normalization', 'standardization']:
data_[feat_sclr_spec] = \
feat_sclr[feat_sclr_spec]\
.transform(data[predictors])
else:
data_[feat_sclr_spec] = np.array(data[predictors])
if 'rnn' in models.keys() or 'dnn' in models.keys():
shape = data[predictors].shape
shape = (shape[0], len(lags), shape[1] // len(lags))
for model in models.keys():
if model in ['dnn', 'rnn']:
# Make predictions
data['pred_' + model] = \
indextolabel(
prob_to_class(
models[model].predict(
data_[feature_scaling[model]].reshape(shape)
)
),
indextolabelmap
)
# Evaluate deep learning model predictions
models[model].summary()
print(f'\nNumber of features (predictors): \
{models[model].inputs[0].shape[2]}')
print(f'\nNumber of time steps (lags): {len(lags)}')
print('Evaluate:')
models[model].evaluate(
data_[feature_scaling[model]].reshape(shape),
to_categorical(
labeltoindex(data[strat_str])[0],
num_outputs
)
);
print(f'Number of classes: \
{models[model].outputs[0].shape[1]}')
else:
# Make predictions
data['pred_' + model] = \
indextolabel(
prob_to_class(
models[model].predict_proba(
data_[feature_scaling[model]]
)
),
indextolabelmap
)
# Evaluate model predictions
print(f'{models[model]}')
print(f'Number of features (predictors): \
{models[model].n_features_in_}')
print(f'Accuracy: \
{accuracy_score(data[{strat_str}], \
data["pred_" + model])}')
print(f'Classes: \
{[indextolabelmap[_] \
for _ in models[model].classes_]}')
print(f'Confusion Matrix: \
\n{confusion_matrix(data[{strat_str}], \
data["pred_" + model])}\n')
# Fit the models
feature_scaler = fit_models(train)
# Derives all position values
derive_positions(test, feature_scaler)
Vectorized backtesting allows for a quick comparison of the trading strategies generated by the models (without eliminating short selling and without incorporating a trading commission).
# Function to evaluate trading strategies
def evaluate_strategies(data):
global strategy_rtn, pos_strategy
strategy_rtn, pos_strategy = [], []
# Trades are made on the open
asset_info_trd = 'open'
col_log_ratio =\
f'{asset_info_trd[0].lower()}_log_ratio'
for model in models.keys():
col_pred = 'pred_' + model
col_pos = 'pos_' + model
col_strat = 'strategy_' + model
data[col_pos] = data[col_pred]
simplify_strategy(data, col_pos)
data[col_strat] =\
data[col_pos].shift(1) * data[col_log_ratio]
strategy_rtn.append(col_strat)
pos_strategy.append(col_pos)
data['returns'] = data[col_log_ratio]
strategy_rtn.insert(0, 'returns')
# Evaluate all trading strategies by multiplying
# predicted positions by actual daily returns
evaluate_strategies(test)
# calculate total return and annualized volatility
# of each strategy
print('\nTotal Return: \n')
print(test[strategy_rtn].sum().apply(np.exp))
print('\nAnnualized Volatility: \n')
if ticker_symbol in ['btc-usd']:
print(test[strategy_rtn].std() * 365 ** 0.5)
# Number of trades over time
print('\nNumber of trades: \n')
print(((test[pos_strategy].diff()[1:]!=0).sum())
+(np.sum(test[pos_strategy]) == len(test[pos_strategy])))
# Visualize performance of trading strategies over time
ax = test[strategy_rtn].cumsum().apply(np.exp).plot(
figsize=(12, 6),
title = f'{ticker_symbol.upper()} ' \
'Comparison of Returns for each Strategy')
ax.set_ylabel('Cumulative Returns')
ax.grid(True);
plt.tight_layout();
plt.savefig(f'results/{ticker_symbol}_vectorized_backtesting', dpi=300)
Backtrader allows a more in-depth analysis of each trading strategy. There is no short selling in the setup here. The initial amount invested is 1000 USD. The trading commission is 0.5 percent, which is the maximum fee for each transaction on Coinbase.
The focus here in on the Deep Neural Network (DNN) strategy, which is the best-performing non-buy-and-hold strategy. There is a timeline and figure showing trading information for the DNN strategy.
Additionally, there is some groundwork for a comparison between the DNN and buy-and-hold strategies in a subsequent analysis with pyfolio.
if ticker_symbol in ['btc-usd']:
asset_type = 'crypto'
if ticker_symbol == 'btc-usd':
vol_prec = 8 # Bitcoin decimal place precision
# Class to include columns in addition to price and volume data
class SignalData(PandasData):
"""
Define pandas DataFrame structure
"""
cols = ['predicted']
# create lines
lines = tuple(cols)
# define parameters
params = {c: -1 for c in cols}
params.update({'datetime': None})
params = tuple(params.items())
# Define backtesting strategy class
class Strategy(bt.Strategy):
params = (
('verbose', True),
('logging', False),
('show_tech_ind', False),
('rsi_period', 14),
('bband_period', 20),
('macd_periods', (12, 26, 9)),
('atr_period', 14),
('wma_period', 30),
('ema_period', 30),
('pfast', 50),
('pslow', 200),
)
def __init__(self):
# keep track of open, closing prices and predicted value in the series
self.data_predicted = self.datas[0].predicted
self.data_open = self.datas[0].open
self.data_close = self.datas[0].close
# keep track of pending orders/buy price/buy commission
self.order = None
self.price = None
self.comm = None
# For logging profit and loss
if self.params.logging:
self.log_pnl = []
# Technical indicators to be plotted
if self.params.show_tech_ind:
# Relative Strength Index (RSI)
rsi = bt.indicators.RSI(
self.datas[0],
period=self.params.rsi_period,
plotname='Relative Strength Index'
)
bt.indicators.SmoothedMovingAverage(
rsi,
period=10,
plotname='Smoothed Relative Strength Index'
)
# Bollinger Bands
bt.indicators.BollingerBands(
self.datas[0],
period=self.params.bband_period,
plotname='Bollinger Bands',
subplot=True
)
# Moving Average Convergence Divergence
bt.indicators.MACDHisto(
self.datas[0],
period_me1=self.params.macd_periods[0],
period_me2=self.params.macd_periods[1],
period_signal=self.params.macd_periods[2],
plotname='Moving Average Convergence Divergence'
)
# Average True Range
bt.indicators.ATR(
self.datas[0],
period=self.params.atr_period,
plotname='Average True Range'
)
# Stochastic Oscillator
bt.indicators.StochasticSlow(
self.datas[0]
)
# Weighted and Exponential Moving Averages
bt.indicators.WeightedMovingAverage(
self.datas[0],
period=self.params.wma_period,
subplot=True
)
bt.indicators.ExponentialMovingAverage(
self.datas[0],
period=self.params.ema_period,
subplot=True
)
# Moving Average Crossovers
self.slow_sma = bt.indicators.SimpleMovingAverage(
self.datas[0],
period=self.params.pslow
)
self.fast_sma = bt.indicators.SimpleMovingAverage(
self.datas[0],
period=self.params.pfast
)
self.crossover = bt.indicators.CrossOver(
self.fast_sma,
self.slow_sma
)
# logging function
def log(self, txt, dt=None):
'''Logging function'''
if self.params.verbose:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()} {txt}')
if self.params.logging:
dt = dt or self.datas[0].datetime.date(0)
self.log_pnl.append(f'{dt.isoformat()} {txt}')
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
# order already submitted/accepted - no action required
return
# report executed order
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'BUY EXECUTED --- \
Price: {order.executed.price:.2f}, \
Cost: {order.executed.value:.2f}, \
Commission: {order.executed.comm:.2f}')
self.price = order.executed.price
self.comm = order.executed.comm
else:
self.log(f'SELL EXECUTED --- \
Price: {order.executed.price:.2f}, \
Cost: {order.executed.value:.2f}, \
Commission: {order.executed.comm:.2f}')
self.bar_executed = len(self)
# report failed order
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Order Failed')
# set no pending order
self.order = None
def notify_trade(self, trade):
if not trade.isclosed:
return
self.log(f'OPERATION RESULT --- \
Gross: {trade.pnl:.2f}, \
Net: {trade.pnlcomm:.2f}')
# We have set cheat_on_open = True. This means that we
# calculate the signals on day t's close price, but
# calculate the number of shares we wanted to buy
# based on day t+1's open price.
def next_open(self):
# Check if an order is pending.
# If yes, we cannot send a 2nd one
if self.order:
return
# Check if we are in the market
if not self.position:
# Not yet ... we MIGHT BUY if ...
if self.data_predicted > 0:
# calculate the max number of shares ('all-in')
if asset_type == 'stocks':
size = \
int(
(1 - self.broker.comminfo[None].p.commission) * \
self.broker.getcash() /
self.datas[0].open
)
elif asset_type == 'crypto':
size = \
int(
(1 - self.broker.comminfo[None].p.commission) * \
self.broker.getcash() * (10**vol_prec) /
self.datas[0].open
) / (10**vol_prec)
# buy order
self.log(f'BUY CREATED --- \
Size: {size}, \
Cash: {self.broker.getcash():.2f}, \
Open: {self.data_open[0]}, \
Close: {self.data_close[0]}')
self.order = self.buy(size=size)
else:
if self.data_predicted < 0:
# sell order
self.log(f'SELL CREATED --- \
Size: {self.position.size}')
self.order = self.sell(size=self.position.size)
def stop(self):
if self.params.logging:
with open(
f'results/{ticker_symbol}_' +
f'{selected_model}_log.csv',
'w'
) as e:
for line in self.log_pnl:
e.write(line + '\n')
strategies = {0: 'log_reg',
1: 'gauss_nb',
2: 'svm',
3: 'random_forest',
4: 'MLP',
5: 'dnn',
6: 'rnn',
7: 'buy_and_hold',
8: 'tgt_strategy'
}
strategy_info = {}
strategy_vs_benchmark = [7, 5] # This order is important
# for future results
for s in strategy_vs_benchmark:
selected_model = strategies[s]
if selected_model == 'tgt_strategy':
test['predicted'] = test[strat_str]
elif selected_model == 'buy_and_hold':
test['predicted'] = np.ones(len(test))
else:
test['predicted'] = test['pred_' + selected_model]
if selected_model != 'buy_and_hold':
display(test.head(num_dates_disp))
display(test.tail(num_dates_disp))
# instantiate SignalData class
bttest = SignalData(dataname=test)
# instantiate Cerebro,
# add strategy, data, initial cash, commission
# and pyfolio for performance analysis
cerebro = bt.Cerebro(stdstats=True, cheat_on_open=True)
if selected_model == 'buy_and_hold':
verbose = False
logging = False
else:
verbose = True
logging = True
cerebro.addstrategy(Strategy,
verbose=verbose,
logging=logging,
show_tech_ind=False)
cerebro.adddata(bttest, name=ticker_symbol.upper())
cerebro.broker.setcash(1000.0)
cerebro.broker.setcommission(commission=0.005) # At most
# 0.5 percent
# for trading
# commission
cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
if selected_model != 'buy_and_hold':
cerebro.addwriter(
bt.WriterFile,
csv=True,
out=f'results/{ticker_symbol}_{selected_model}_log.csv')
# run the backtest
if selected_model != 'buy_and_hold':
print(f'Starting Portfolio Value: {cerebro.broker.getvalue():.2f}')
backtest_result = cerebro.run()
strategy_info[selected_model] = backtest_result[0]
if selected_model != 'buy_and_hold':
print(f'Final Portfolio Value: {cerebro.broker.getvalue():.2f}')
plt.rcParams['font.sans-serif'] = \
['Tahoma', 'DejaVu Sans', 'Lucida Grande', 'Verdana']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (18, 16)
plt.rcParams['figure.dpi'] = 300
plt.rcParams['figure.facecolor'] = 'w'
plt.rcParams['figure.edgecolor'] = 'k'
btimagefile = f'results/{ticker_symbol}_backtrader_backtesting_{selected_model}.png'
cerebro.plot()[0][0].savefig(btimagefile, dpi=300)
pyfolio is used here to flesh out the picture of the performance and risk of the Deep Neural Network (DNN) trading strategy. There are statistics and plots showing:
# Returns (and other information) from
# selected strategy and buy-and-hold strategy
returns = {}
positions = {}
transactions = {}
gross_lev = {}
for s in strategy_vs_benchmark:
selected_model = strategies[s]
pyfoliozer =\
strategy_info[selected_model].analyzers.getbyname('pyfolio')
returns[selected_model],\
positions[selected_model],\
transactions[selected_model],\
gross_lev[selected_model] =\
pyfoliozer.get_pf_items()
if selected_model == 'buy_and_hold':
returns[selected_model].name = 'Benchmark'
benchmark_returns = returns[selected_model]
else:
returns[selected_model].name = 'Strategy'
returns[selected_model]\
.to_csv(f'results/{ticker_symbol}_{selected_model}_returns.csv')
# Get performance statistics for selected strategy
pf.show_perf_stats(returns[selected_model])
# Selected strategy versus buy-and-hold
#
# First plot
fig, ax = plt.subplots(nrows=2, ncols=2,
figsize=(16, 9),
constrained_layout=True)
axes = ax.flatten()
# Top drawdown periods
pf.plot_drawdown_periods(returns=returns[selected_model],
fontsize=16,
ax=axes[0])
# Rolling returns
pf.plot_rolling_returns(returns=returns[selected_model],
factor_returns=benchmark_returns,
fontsize=16,
ax=axes[1],
title='Rolling returns')
# Drawdown underwater plot
pf.plot_drawdown_underwater(returns=returns[selected_model],
fontsize=16,
ax=axes[2])
# Rolling Sharpe ratio
pf.plot_rolling_sharpe(returns=returns[selected_model],
fontsize=16,
ax=axes[3])
for i in range(4):
axes[i].grid(True)
axes[i].set_xlabel('')
axes[i].set_ylabel(axes[i].get_ylabel(), fontsize=16)
if i == 0:
axes[i].legend(
axes[i].get_legend_handles_labels()[0],
[f'{selected_model.upper()}'],
loc='upper left'
)
elif i == 1:
axes[i].legend(
axes[i].get_legend_handles_labels()[0],
['Buy-and-Hold', f'{selected_model.upper()}'],
loc='upper left',
fontsize=13
)
axes[i].set_title(axes[i].get_title(), fontsize=18)
fig.suptitle(f'{selected_model.upper()} vs '\
f'Buy-and-Hold Strategy '\
f'({ticker_symbol.upper()})',
fontsize=20)
plt.tight_layout(pad=4)
plt.savefig(f'results/{ticker_symbol}_{selected_model}_pyfolio1',
dpi=300)
# Selected strategy versus buy-and-hold
#
# Second plot
fig, ax = plt.subplots(nrows=2, ncols=2,
figsize=(16, 9),
constrained_layout=True)
axes = ax.flatten()
# Rolling Beta
pf.plot_rolling_beta(returns=returns[selected_model],
factor_returns=benchmark_returns,
fontsize=16,
ax=axes[0])
# Rolling Volatility
pf.plot_rolling_volatility(returns=returns[selected_model],
factor_returns=benchmark_returns,
fontsize=16,
ax=axes[1])
# Annual Returns
pf.plot_annual_returns(returns=returns[selected_model],
fontsize=16,
ax=axes[2])
# Monthly Returns
pf.plot_monthly_returns_heatmap(returns=returns[selected_model],
ax=axes[3])
for i in range(4):
if i < 3:
if i < 2:
axes[i].set_xticklabels(axes[i].get_xticklabels(),
fontsize=14)
axes[i].grid(True)
axes[i].set_xlabel(axes[i].get_xlabel(), fontsize=16)
axes[i].set_ylabel(axes[i].get_ylabel(), fontsize=16)
axes[i].set_title(axes[i].get_title(), fontsize=18)
fig.suptitle(f'{selected_model.upper()} vs '\
f'Buy-and-Hold Strategy '\
f'({ticker_symbol.upper()})',
fontsize=20, y=0.94)
plt.tight_layout(pad=0.94)
plt.savefig(f'results/{ticker_symbol}_{selected_model}_pyfolio2',
dpi=300)
In preliminary analyses shown here, each of the approaches performed at about chance level (50%) in predicting Bitcoin price movements.
| Model | Accuracy |
|---|---|
| Logistic Regression | 53% |
| Gaussian Naive Bayes | 49% |
| Support Vector Classification | 53% |
| Random Forest Classifier | 47% |
| Multi-layer Perceptron Classifier | 49% |
| Deep Neural Network (DNN) | 54% |
| Recurrent Neural Network (RNN) | 51% |
Additionally, none the machine/deep learning algorithms presented here outperform the buy-and-hold strategy (267%). The DNN strategy (245%) comes closest.
| Strategy | Vectorized Backtesting (Returns) | Backtesting with Backtrader (Returns) |
|---|---|---|
| Logistic Regression | 90% | 17% |
| Gaussian Naive Bayes | -5% | 26% |
| Support Vector Classification | 207% | 155% |
| Random Forest | -55% | -54% |
| Multi-layer Perceptron | -17% | -41% |
| Deep Neural Network | 233% | 245% |
| Recurrent Neural Network | 156% | 120% |
| Buy-and-hold | 267% | |
| Optimal | 84339% |
Introducing features from the other information streams such as:
may help with identifying strategies that do better than the buy-and-hold strategy.